Skip to content

Conversation

@JingsongLi
Copy link
Contributor

@JingsongLi JingsongLi commented Aug 17, 2020

This is subtask of #1293

Introduce FlinkInputFormat to read records from Iceberg table by Flink engine. The FlinkInputFormat is the foundation of SQL reader and streaming reader.

Implemented by:

  • Introduce RowDataIterator and DataIterator, just like Spark BaseDataReader and RowDataReader.
  • The DataIterator provides a union iterator to union FileScanTask iterators.
  • Introduce FlinkSplitGenerator, this generator just wrap Iceberg CombinedScanTask to FlinkInputSplit.
  • Introduce FlinkInputFormat, it provides a simple API: FlinkInputFormat.builder().tableLoader(..).build().

This PR is based on:

}
}

abstract CloseableIterator<T> nextTaskIterator(FileScanTask scanTask) throws IOException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming this method to openTaskIterator ?


@Override
protected CloseableIterator<RowData> nextTaskIterator(FileScanTask task) {
// schema or rows returned by readers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this comment still valuable ? Seems I did not get the point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can delete it.

}

@Test
public void testProjection() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure whether flink support complex data type projection, if sure we may need more unit tests to address the projection cases, such as projection by a nested struct, map, list (similar to the spark's TestReadProjection ).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another case: Project with a new renamed schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a NestedFieldsProjectableTableSource in Flink, but so far, no connector has implemented it. There may be unknown risks, and I tend not to implement it first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another case: Project with a new renamed schema

You mean: Create a table, insert some data, rename some fields, insert some data. Then read table using Flink?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the cases you said, I read the cases in TestReadProject, and was thought that we may also need a ut to address this one: write records into a table and then read them from table by another schema with same fields id but different field names. But we flink are prejection with a List<String> (field names), so we don't have such case. We could ignore the case for unit test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field reordering tests are at the file format level. Each file format has to be able to project columns in the requested order. So any reordered schema should work as long as it is passed down correctly, which is what the new convert method does.


Map<String, Integer> indexByName = TypeUtil.indexByName(schema.asStruct());
Set<Integer> projectedIds = projectedFields.stream().map(indexByName::get).collect(Collectors.toSet());
return TypeUtil.select(schema, projectedIds);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continue with the question from here. If we could produce a ordered & projected schema in this method (Saying if this method is pruneWithReordering), then seems we don't have to convert the read RowData to the correct order here ?

I'd prefer to use the correct projected schema to read the target RowData if possible, rather than reading RowData in a disordered schema and then order them in an iterator transformation. Because this is in the critical read path and an extra RowData transformation will cost more resources , also make the codes hard to follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to use the correct projected schema to read the target RowData if possible

This is what I want too, but I'm afraid the current format readers do not have this capability. You can take a look to AvroSchemaWithTypeVisitor, the readers order is according to file schema instead of Flink projected/expected schema.

Because this is in the critical read path and an extra RowData transformation will cost more resources

The performance is OK, because we just use a lazy projection in ProjectionRowData, Unnecessary projections are omitted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rdblue , Mind to take a look this ? Thanks.

Copy link
Contributor

@rdblue rdblue Aug 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to use the expected/projection schema. All readers should reorder columns to produce the requested column order.

AvroSchemaWithTypeVisitor is used to traverse the file schema to create the reader structure, but that's because fields in Avro must be read in the file's order. But when that reader adds data columns to records, the values are put in the correct order because the ResolvingDecoder returns the correct position in the projection schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @openinx and @rdblue ! I'm very happy to be able to solve my confusion. I will do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter if we change the order of the schema, as long as the ID doesn't change.

@rdblue
Copy link
Contributor

rdblue commented Aug 20, 2020

@JingsongLi, now that #1332 has been merged, could you update this one? It looks like this one is next to review.

@JingsongLi
Copy link
Contributor Author

Hi @openinx and @rdblue , rebased and added case and fixed bug.
I think this is ready to review.

@openinx
Copy link
Member

openinx commented Aug 21, 2020

Fine, I will take a look.

* @return a Schema corresponding to the Flink projection
*/
public static Schema projectWithReordering(Schema schema, List<String> projectedFields) {
if (projectedFields == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : projectedFields == null || projectFields.length ==0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

projectFields.length ==0 means project empty column. But projectedFields == null means project all columns.


/**
* Project columns from a {@link Schema} using a projected fields.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may need to add a comment to indicate that: we don't support complex data type projection for flink now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments: Don't support nested fields projection for Flink now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't nested fields supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink SQL did not have good support for nested projection before. I'll verify it and try to implement nested fields projection.

* @param projectedFields projected fields from Flink
* @return a Schema corresponding to the Flink projection
*/
public static Schema projectWithReordering(Schema schema, List<String> projectedFields) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've already have a Schema#select, will it fit for your requirement ?

  /**
   * Creates a projection schema for a subset of columns, selected by name.
   * <p>
   * Names that identify nested fields will select part or all of the field's top-level column.
   *
   * @param names a List of String names for selected columns
   * @return a projection schema from this schema, by name
   */
  public Schema select(Collection<String> names) {
    return internalSelect(names, true);
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can take a look to Schema.internalSelect, it is a Set<Integer> selected, so actually the interface is: "selectWithoutReordering"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that sounds reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do this in Spark here: https://github.com/apache/iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java#L161-L166

You might try a similar approach since we already have Flink to Iceberg conversion. It should just be a matter of reassigning the IDs. You may also need the type fixes, I don't recall if Iceberg to Flink to Iceberg conversion is lossy or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may also need the type fixes

Yes, we can have.

After thinking about it with nested projection, we can pass a TableSchema requestedSchema parameter, which contains the required projection (Including nested) and the type to be fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only UUID need be fixed...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create #1382 for fixup.

* under the License.
*/

package org.apache.iceberg.flink.source;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, you put all reader related classes inside the source package, will we also need to put those writer related classes into sink package ? I don't have strong feeling to do that, keeping consistence is OK for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we also need to put those writer related classes into sink package ?

I think we can.

case BINARY: // byte[]
return ByteBuffers.toByteArray((ByteBuffer) value);
case TIME: // int instead of long
return (int) (DateTimeUtil.timeFromMicros((Long) value).toNanoOfDay() / 1000_000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the long value is surely microseconds ? could we just return (int)((long)value/1_000) ?

public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
// Called in Job manager, so it is OK to load table from catalog.
tableLoader.open(serializableConf.get());
try (TableLoader loader = tableLoader) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Em, seems I've missed to close the TableLoader in IcebergFilesCommitter patch..

StructReader(List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
this.numFields = readers.size();
this.numFields = struct.fields().size();
Copy link
Contributor Author

@JingsongLi JingsongLi Aug 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes Flink Orc Reader (with partition) bug.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, if the schema is a projected read schema, then the numFields will be mismatched.

this.reader = OrcSchemaWithTypeVisitor.visit(iSchema, readSchema, new ReadBuilder(idToConstant));
}

public static OrcRowReader<RowData> buildReader(Schema schema, TypeDescription readSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this not used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there are two constructors, so we need to have two static helpers, I think we can use constructors directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just used by testing code.


// The hive ORC writer may will adjust the scale of decimal data.
Preconditions.checkArgument(value.precision() <= precision,
"Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Flink require this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to add this check to avoid potential precision mismatched bugs.

return ByteBuffers.toByteArray((ByteBuffer) value);
case BINARY: // byte[]
return ByteBuffers.toByteArray((ByteBuffer) value);
case TIME: // int instead of long
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: int in millis?

*/
public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit> {

private static final long serialVersionUID = 1L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

Copy link
Contributor Author

@JingsongLi JingsongLi Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a stream computing job, I think it's better to have it. If there is a Job running in the cluster, in the future, if user update Iceberg-Flink version, this version modify something that does not affect compatibility, but resulting in a change to serialVersionUID, the user's job will be incompatible after the cluster upgrade. In fact, this situation is compatible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Flink using Java serialization across versions? That seems like a big risk to me. I'd prefer to only use Java serialization between processes running the exact same version of Iceberg. If we need to serialize across versions (like for checkpoint data) then I think we should worry about compatibility a lot more.

return this;
}

public ScanOptions build() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: do you think whether there's need to abstract the common options builder sharing between flink and spark (maybe also hive/pig) to validate and build those properties into a ScanOptions ? If sure, we may finish that in a new separate pr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, there are some difference between Flink/Spark and Hive, maybe we can try to do something in Spark.


@Test
public void testIdentityPartitionProjections() throws Exception {
Schema logSchema = new Schema(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is testPartitionTypes for this.

@JingsongLi
Copy link
Contributor Author

Thanks @rdblue and @openinx for the review, I have added project(TableSchema schema) and select(List<String> fields) to format builder, project can support nested projection.

TableSchema.Builder builder = TableSchema.builder();
for (String field : selectedFields) {
TableColumn column = tableSchema.getTableColumn(field).orElseThrow(() -> new IllegalArgumentException(
"The fields are illegal in projectedFields: " + selectedFields));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to point out which column is missing in the error message .

Comment on lines 189 to 202
public Builder icebergSchema(Schema newSchema) {
this.icebergSchema = newSchema;
return this;
}

public Builder io(FileIO newIO) {
this.io = newIO;
return this;
}

public Builder encryption(EncryptionManager newEncryption) {
this.encryption = newEncryption;
return this;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need those three methods ? I saw that we would loadTable and override all of the three if anyone is null, that says setting one or two of them won't work in this builder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me , the table(Table newTable) and tableLoader(TableLoader newLoader) is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I can remove them.

encryption = table.encryption();
}

if (projectedSchema != null && selectedFields != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? If this delegated to when the scan is built, then the scan would do the check and users would get consistent exception messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below comments.

field, tableSchema)));
builder.field(column.getName(), column.getType());
}
projectedTableSchema = builder.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why project the Flink schema manually rather than using icebergSchema.select(selectedFIelds) and converting the result?

Copy link
Contributor Author

@JingsongLi JingsongLi Sep 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we remove this select, because what Flink SQL want, is an order changed select, instead of using original iceberg table order.
We should provide a unified select, so I think we can provide in Flink side now, and we can use project.

return this;
}

public Builder project(TableSchema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will SQL use this or select?

Copy link
Contributor Author

@JingsongLi JingsongLi Sep 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL use order changed select, because SQL not supports nested fields push down now.
But I think should support nested push down in future, then, should use project.

But if we provide an original schema ordered select(Set<String>) like TableScan.select, I think SQL can not use this one.

return env;
}

RowDataTypeInfo getRowTypeInfo() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are intended to be called from child classes, should they be protected?

public DataStream<RowData> build() {
Preconditions.checkNotNull(getEnv(), "StreamExecutionEnvironment should not be null");
FlinkInputFormat format = buildFormat();
return getEnv().createInput(format, getRowTypeInfo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different than an unbounded builder? I don't see anything that passes whether the stream should be bounded or unbounded. It seems like this should pass that information so that the input adapter can plan the current table scan, rather than checking for new data later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not having endSnapshot in ScanOptions seems to imply that the scan is unbounded.

Copy link
Contributor Author

@JingsongLi JingsongLi Sep 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about adding a new config option like bounded is true or not.
After reading your comments, I think it's very good. Because streaming jobs have a high probability of setting the starting snapshot ID or timestamp, the strategy can be:

  • Without startSnapshotId: Bounded
  • With startSnapshotId and with endSnapshotId: Bounded
  • With startSnapshotId (-1 means unbounded preceding) and Without endSnapshotId: Unbounded

In this way, we can have a unify builder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good way to configure this to me, except that we will want to make sure the default is reasonable. For Flink, should the default be unbounded or bounded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer that default is bounded.

  • Bounded mode is more common in attempt and startup of users.
  • In unbounded mode, users often define startSnapshotId . But in bounded mode, endSnapshotID is rare. If default is unbounded, it is hard to define bounded mode.

return caseSensitive;
}

public Long getSnapshotId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this class should conform to the typical style used by Flink or Iceberg, but in Iceberg, we omit get from getter names because it doesn't add any helpful context and is awkward in non-Java languages where getter methods are named for fields.

Table table, List<String> projectFields, CatalogLoader loader, Long snapshotId, Long startSnapshotId,
Long endSnapshotId, Long asOfTimestamp, List<Expression> filters, String sqlFilter) throws IOException {
ScanOptions options = ScanOptions.builder().snapshotId(snapshotId).startSnapshotId(startSnapshotId)
.endSnapshotId(endSnapshotId).asOfTimestamp(asOfTimestamp).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tests would be much more readable if the ScanOptions builder were used directly. For example, this is hard to understand:

assertRecords(executeWithOptions(table, null, null, null, snapshotId1, null, null, null, null), expected1, SCHEMA);

But you could rewrite that like this:

ScanOptions options = ScanOptions.builder().startSnapshotId(snapshotId1).build();
assertRecords(executeWithOptions(table, options), expected1, SCHEMA);

In addition, passing null into this leaks the default state within the builder into the tests: test authors need to know that passing null for CatalogLoader is supported. I think it is better to let the test authors use the builder pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you.
I was thinking about how to make it easy for SQL tests to reuse it. SQL testing can also rebuild SQL strings from ScanOptions.

long timestampMillis = table.currentSnapshot().timestampMillis();

// produce another timestamp
Thread.sleep(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually introduce a spin to avoid sleeping for long durations in lots of tests, like this: https://github.com/apache/iceberg/blob/master/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java#L78

* The Flink SQL output: [f2, data]
* The FlinkInputFormat output: [nested[f2], data].
*/
protected abstract void assertNestedProjection(Table table, List<Record> records) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it really strange that this is delegated to a subclass, given that it builds a very specific nested projection.

Why not make this use a method like execute(Table, List<String>), but pass in the projection instead of a list of fields?

Then you could keep all of the schema details in the test method here, rather than delegating this assertion. I think it would be cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move testNestedProjection to TestFlinkInputFormat, because only InputFormat supports nested push-down, SQL can not.

* But the FlinkInputFormat can't.
*/
protected abstract void assertResiduals(Schema schema, List<Row> results, List<Record> writeRecords,
List<Record> filteredRecords) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this implemented by the subclass? Couldn't this just call assertRecords directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also because of the difference between InputFormat and SQL(It can actually filter out the data). I think I should put it into a subclass of SQL.

@rdblue rdblue merged commit 1a797cd into apache:master Sep 25, 2020
@rdblue
Copy link
Contributor

rdblue commented Sep 25, 2020

I'm going ahead and merging this because I think the remaining things are minor, but we will probably want to fix them.

First, breaking the tests across two classes could be cleaner. In the future, I'd recommend writing tests and not breaking across a parent/child class until you want to add the other set of tests. That way it is easier to see what is needed and will be clean.

Second, I don't understand the benefit of having a separate ScanOptions class, besides that it can pull options out of a map. It seems to me that it would be simpler to just have the FlinkSource.Builder class and move all of the scan options to that class.

@JingsongLi
Copy link
Contributor Author

Thanks @rdblue for the review, I'll address your test comments in next PR (Integrate to SQL).
The reason for a separate ScanOptions is: For SQL layer, ScanOptions contains options in CREATE TABLE ... WITH (options), while other parameters of FlinkSource.Builder are not. This means that the parameters in ScanOptions must be in the form of string.
If you don't think it is necessary, we can also merge it into the builder.

@rdblue
Copy link
Contributor

rdblue commented Sep 25, 2020

Let's take a look at ScanOptions in the next PR then. I would prefer to keep user-facing APIs simple, rather than leaking a SQL concern (options come from WITH) to users (need to use two builders). Since SQL will most likely use the fromProperties method, it may make sense to use a single builder, add withProperties, and pass properties from SQL as a map.

*
* @return {@link Builder} to connect the iceberg table.
*/
public static Builder forRowData() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @JingsongLi , I'm testing Iceberg recently. Since the StreamExecutionEnvironment is a must-have parameter for FlinkSource, would it better to put it in the builder's constructor instead of FlinkSource.forRowData().env(xx)?

just a minor improvement on user experience.

Copy link
Contributor Author

@JingsongLi JingsongLi Oct 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to keep builder pattern. And for now, we can create a FlinkInputFormat without env too.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... The builder is for both InputFormat and DataStreamSource.

@JingsongLi JingsongLi deleted the format branch November 5, 2020 09:42
@rdblue rdblue added this to the Java 0.10.0 Release milestone Nov 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants